8.1 并发的含义

在开始本章之前,需要了解并发(concurrency)和并行(parallesim)的区别。

  • 并发:逻辑上具备同时处理多个任务的能力。
  • 并行:物理上在同一时刻执行多个并发任务。

我们通常会说程序是并发设计的,也就是说它允许多个任务同时执行,但实际上并不一定真在同一时刻发生。在单核处理器上,它们能以间隔方式切换执行。而并行则依赖多核处理器等物理设备,让多个任务真正在同一时刻执行,它代表了当前程序运行状态。简单点说,并行是并发设计的理想执行模式。

Concurrency is not parallelism:Different concurrent designs enable different ways to parallelize.

多线程或多进程是并行的基本条件,但单线程也可用协程(coroutine)做到并发。尽管协程在单个线程上通过主动切换来实现多任务并发,但它也有自己的优势。除了将因阻塞而浪费的时间找回来外,还免去了线程切换开销,有着不错的执行效率。协程上运行的多个任务本质上是依旧串行的,加上可控自主调度,所以并不需要做同步处理。

即便采用多线程也未必就能并行。Python就因GIL限制,默认只能并发而不能并行,所以很多时候转而使用“多进程+协程”架构。

很难说哪种方式更好一些,它们有各自适用的场景。通常情况下,用多进程来实现分布式和负载平衡,减轻单进程垃圾回收压力;用多线程(LWP)抢夺更多的处理器资源;用协程来提高处理器时间片利用率。

简单将goroutine归纳为协程并不合适。运行时会创建多个线程来执行并发任务,且任务单元可被调度到其他线程并行执行。这更像是多线程和协程的综合体,能最大限度提升执行效率,发挥多核处理能力。

更多实现细节,请阅读本书下卷《源码剖析》。

只须在函数调用前添加go关键字即可创建并发任务。

go println(“hello,world!”)

go func(s string) { println(s) }(“hello,world!“)

注意是函数调用,所以必须提供相应的参数。

关键字go并非执行并发操作,而是创建一个并发任务单元。新建任务被放置在系统队列中,等待调度器安排合适系统线程去获取执行权。当前流程不会阻塞,不会等待该任务启动,且运行时也不保证并发任务的执行次序。

每个任务单元除保存函数指针、调用参数外,还会分配执行所需的栈内存空间。相比系统默认MB级别的线程栈,goroutine自定义栈初始仅须2 KB,所以才能创建成千上万的并发任务。自定义栈采取按需分配策略,在需要时进行扩容,最大能到GB规模。

在不同版本中,自定义栈大小略有不同。如未做说明,本书特指1.6 amd64。

与defer一样,goroutine也会因“延迟执行”而立即计算并复制执行参数。

var c int

func counter()int{ c++ return c }

func main() { a:=100

go func(x,y int) { time.Sleep(time.Second) // 让goroutine在main逻辑之后执行 println(“go:“,x,y) }(a,counter()) // 立即计算并复制参数

a+=100 println(“main:“,a,counter())

time.Sleep(time.Second*3) // 等待goroutine结束 }

输出:

main:200 2 go:100 1

Wait

进程退出时不会等待并发任务结束,可用通道(channel)阻塞,然后发出退出信号。

func main() { exit:=make(chan struct{}) // 创建通道。因为仅是通知,数据并没有实际意义

go func() { time.Sleep(time.Second) println(“goroutine done.”)

   close(exit)           // 关闭通道,发出信号 
}() 

println(“main…”) exit // 如通道关闭,立即解除阻塞 println(“main exit.“) }

输出:

main… goroutine done. main exit.

除关闭通道外,写入数据也可解除阻塞。channel的更多信息,后面再做详述。

如要等待多个任务结束,推荐使用sync.WaitGroup。通过设定计数器,让每个goroutine在退出前递减,直至归零时解除阻塞。

import( “sync” “time” )

func main() { var wg sync.WaitGroup

for i:=0;i<10;i++ { wg.Add(1) // 累加计数

   go func(id int) { 
       defer wg.Done()          // 递减计数 

       time.Sleep(time.Second) 
       println("goroutine",id, "done.") 
    }(i) 
} 

println(“main…”) wg.Wait() // 阻塞,直到计数归零 println(“main exit.“) }

输出:

main… goroutine 9 done. goroutine 4 done. goroutine 2 done. goroutine 6 done. goroutine 8 done. goroutine 3 done. goroutine 5 done. goroutine 1 done. goroutine 0 done. goroutine 7 done. main exit.

尽管WaitGroup.Add实现了原子操作,但建议在goroutine外累加计数器,以免Add尚未执行,Wait已经退出。

func main() { var wg sync.WaitGroup

go func() { wg.Add(1) // 来不及设置 println(“hi!“) }()

wg.Wait() println(“exit.“) }

可在多处使用Wait阻塞,它们都能接收到通知。

func main() { var wg sync.WaitGroup wg.Add(1)

go func() { wg.Wait() // 等待归零,解除阻塞 println(“wait exit.“) }()

go func() { time.Sleep(time.Second) println(“done.”) wg.Done() // 递减计数 }()

wg.Wait() // 等待归零,解除阻塞 println(“main exit.“) }

输出:

done. wait exit. main exit.

GOMAXPROCS

运行时可能会创建很多线程,但任何时候仅有限的几个线程参与并发任务执行。该数量默认与处理器核数相等,可用runtime.GOMAXPROCS函数(或环境变量)修改。

如参数小于1,GOMAXPROCS仅返回当前设置值,不做任何调整。

import( “math” “runtime” “sync” )

// 测试目标函数 func count() { x:=0 for i:=0;i<math.MaxUint32;i++ { x+=i }

println(x) }

// 循环执行 func test(n int) { for i:=0;i<n;i++ { count() } }

// 并发执行 func test2(n int) { var wg sync.WaitGroup wg.Add(n)

for i:=0;i<n;i++ { go func() { count() wg.Done() }() }

wg.Wait() }

func main() { n:=runtime.GOMAXPROCS(0) test(n) //test2(n) }

输出:

$time./test

9223372030412324865 9223372030412324865 9223372030412324865 9223372030412324865

real 0m8.395s user 0m8.281s sys 0m0.056s

$time./test2

9223372030412324865 9223372030412324865 9223372030412324865 9223372030412324865

real 0m3.907s // 程序实际执行时间 user 0m14.438s // 多核执行时间累加 sys 0m0.041s

该测试机器是4核,可用runtime.NumCPU函数返回。

Local Storage

与线程不同,goroutine任务无法设置优先级,无法获取编号,没有局部存储(TLS),甚至连返回值都会被抛弃。但除优先级外,其他功能都很容易实现。

func main() { var wg sync.WaitGroup var gs[5]struct{ // 用于实现类似TLS功能 id int // 编号 result int // 返回值 }

for i:=0;i<len(gs);i++ { wg.Add(1)

   go func(id int) {          // 使用参数避免闭包延迟求值 
       defer wg.Done() 

       gs[id].id=id
       gs[id].result= (id+1) *100
    }(i) 
} 

wg.Wait() fmt.Printf(”%+v\n”,gs) }

输出:

{id:0 result:100} {id:1 result:200} {id:2 result:300} {id:3 result:400} {id:4 result:500}

如使用map作为局部存储容器,建议做同步处理,因为运行时会对其做并发读写检查。

Gosched

暂停,释放线程去执行其他任务。当前任务被放回队列,等待下次调度时恢复执行。

func main() { runtime.GOMAXPROCS(1) exit:=make(chan struct{})

go func() { // 任务a defer close(exit)

   go func() {            // 任务b。放在此处,是为了确保a优先执行 
       println("b") 
    }() 

   for i:=0;i<4;i++ { 
       println("a:",i) 

       if i==1{     // 让出当前线程,调度执行b
           runtime.Gosched() 
        } 
    } 
}() 

<-exit

}

输出:

a:0 a:1 b a:2 a:3

该函数很少被使用,因为运行时会主动向长时间运行(10 ms)的任务发出抢占调度。只是当前版本实现的算法稍显粗糙,不能保证调度总能成功,所以主动切换还有适用场合。

Goexit

Goexit立即终止当前任务,运行时确保所有已注册延迟调用被执行。该函数不会影响其他并发任务,不会引发panic,自然也就无法捕获。

func main() { exit:=make(chan struct{})

go func() { defer close(exit) // 执行 defer println(“a”) // 执行

   func() { 
       defer func() { 
           println("b",recover() ==nil)      // 执行,recover返回nil
        }() 

       func() {                    // 在多层调用中执行Goexit
           println("c") 
           runtime.Goexit()              // 立即终止整个调用堆栈 
           println("c done.")         // 不会执行 
        }() 

       println("b done.")               // 不会执行 
    }() 

   println("a done.")             // 不会执行 
}() 

<-exit

println(“main exit.“) }

输出:

c b true a main exit.

如果在main.main里调用Goexit,它会等待其他任务结束,然后让进程直接崩溃。

func main() { for i:=0;i<2;i++ {
go func(x int) { for n:=0;n<2;n++ { fmt.Printf(“%c: %d\n”, ‘a’+x,n) time.Sleep(time.Millisecond) } }(i) }

runtime.Goexit() // 等待所有任务结束 println(“main exit.“) }

输出:

b:0 a:0 b:1 a:1 fatal error:no goroutines(main called runtime.Goexit) -deadlock!

无论身处哪一层,Goexit都能立即终止整个调用堆栈,这与return仅退出当前函数不同。

标准库函数os.Exit可终止进程,但不会执行延迟调用。